/*
 * Copyright (c) 2015-2999 广州朗尊软件科技有限公司<www.legendshop.cn> All rights reserved.
 *
 * https://www.legendshop.cn/
 *
 * 版权所有,并保留所有权利
 *
 */
package com.sankuai.inf.leaf.segment;

import cn.legendshop.jpaplus.model.Result;
import cn.legendshop.jpaplus.model.Status;
import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
import com.sankuai.inf.leaf.segment.model.LeafAlloc;
import com.sankuai.inf.leaf.segment.model.Segment;
import com.sankuai.inf.leaf.segment.model.SegmentBuffer;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author legendshop
 */
public class SegmentIDGenImpl implements IDGen {
	private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);

	/**
	 * IDCache未初始化成功时的异常码
	 */
	private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
	/**
	 * key不存在时的异常码
	 */
	private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
	/**
	 * SegmentBuffer中的两个Segment均未从DB中装载时的异常码
	 */
	private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
	/**
	 * 最大步长不超过100,0000
	 */
	private static final int MAX_STEP = 1000000;
	/**
	 * 一个Segment维持时间为15分钟
	 */
	private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
	private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
	private volatile boolean initOK = false;
	private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
	private IDAllocDao dao;

	public static class UpdateThreadFactory implements ThreadFactory {

		private static int threadInitNumber = 0;

		private static synchronized int nextThreadNum() {
			return threadInitNumber++;
		}

		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
		}
	}

	@Override
	public boolean init() {
		logger.info("Init ...");
		// 确保加载到kv后才初始化成功
		updateCacheFromDb();
		initOK = true;
		updateCacheFromDbAtEveryMinute();
		return initOK;
	}

	private void updateCacheFromDbAtEveryMinute() {
		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread t = new Thread(r);
				t.setName("check-idCache-thread");
				t.setDaemon(true);
				return t;
			}
		});
		service.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				updateCacheFromDb();
			}
		}, 60, 60, TimeUnit.SECONDS);
	}

	private void updateCacheFromDb() {
		logger.info("update cache from db");
		StopWatch sw = new Slf4JStopWatch();
		try {
			List<String> dbTags = dao.getAllTags();
			if (dbTags == null || dbTags.isEmpty()) {
				return;
			}
			List<String> cacheTags = new ArrayList<String>(cache.keySet());
			Set<String> insertTagsSet = new HashSet<>(dbTags);
			Set<String> removeTagsSet = new HashSet<>(cacheTags);
			//db中新加的tags灌进cache
			for (int i = 0; i < cacheTags.size(); i++) {
				String tmp = cacheTags.get(i);
				if (insertTagsSet.contains(tmp)) {
					insertTagsSet.remove(tmp);
				}
			}
			for (String tag : insertTagsSet) {
				SegmentBuffer buffer = new SegmentBuffer();
				buffer.setKey(tag);
				Segment segment = buffer.getCurrent();
				segment.setValue(new AtomicLong(0));
				segment.setMax(0);
				segment.setStep(0);
				cache.put(tag, buffer);
				logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
			}
			//cache中已失效的tags从cache删除
			for (int i = 0; i < dbTags.size(); i++) {
				String tmp = dbTags.get(i);
				if (removeTagsSet.contains(tmp)) {
					removeTagsSet.remove(tmp);
				}
			}
			for (String tag : removeTagsSet) {
				cache.remove(tag);
				logger.info("Remove tag {} from IdCache", tag);
			}
		} catch (Exception e) {
			logger.warn("update cache from db exception", e);
		} finally {
			sw.stop("updateCacheFromDb");
		}
	}

	@Override
	public Result get(final String key) {
		if (!initOK) {
			return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
		}
		if (cache.containsKey(key)) {
			SegmentBuffer buffer = cache.get(key);
			if (!buffer.isInitOk()) {
				synchronized (buffer) {
					if (!buffer.isInitOk()) {
						try {
							updateSegmentFromDb(key, buffer.getCurrent());
							logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
							buffer.setInitOk(true);
						} catch (Exception e) {
							logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
						}
					}
				}
			}
			return getIdFromSegmentBuffer(cache.get(key));
		}
		return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
	}

	public void updateSegmentFromDb(String key, Segment segment) {
		StopWatch sw = new Slf4JStopWatch();
		SegmentBuffer buffer = segment.getBuffer();
		LeafAlloc leafAlloc;
		if (!buffer.isInitOk()) {
			leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
			buffer.setStep(leafAlloc.getStep());
			//leafAlloc中的step为DB中的step
			buffer.setMinStep(leafAlloc.getStep());
		} else if (buffer.getUpdateTimestamp() == 0) {
			leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
			buffer.setUpdateTimestamp(System.currentTimeMillis());
			buffer.setStep(leafAlloc.getStep());
			//leafAlloc中的step为DB中的step
			buffer.setMinStep(leafAlloc.getStep());
		} else {
			long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
			int nextStep = buffer.getStep();
			if (duration < SEGMENT_DURATION) {
				if (nextStep * 2 > MAX_STEP) {
					//do nothing
				} else {
					nextStep = nextStep * 2;
				}
			} else if (duration < SEGMENT_DURATION * 2) {
				//do nothing with nextStep
			} else {
				nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
			}
			logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);
			LeafAlloc temp = new LeafAlloc();
			temp.setKey(key);
			temp.setStep(nextStep);
			leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
			buffer.setUpdateTimestamp(System.currentTimeMillis());
			buffer.setStep(nextStep);
			//leafAlloc的step为DB中的step
			buffer.setMinStep(leafAlloc.getStep());
		}
		// must set value before set max
		long value = leafAlloc.getMaxId() - buffer.getStep();
		segment.getValue().set(value);
		segment.setMax(leafAlloc.getMaxId());
		segment.setStep(buffer.getStep());
		sw.stop("updateSegmentFromDb", key + " " + segment);
	}

	public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
		while (true) {
			buffer.rLock().lock();
			try {
				final Segment segment = buffer.getCurrent();
				if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
					service.execute(new Runnable() {
						@Override
						public void run() {
							Segment next = buffer.getSegments()[buffer.nextPos()];
							boolean updateOk = false;
							try {
								updateSegmentFromDb(buffer.getKey(), next);
								updateOk = true;
								logger.info("update segment {} from db {}", buffer.getKey(), next);
							} catch (Exception e) {
								logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
							} finally {
								if (updateOk) {
									buffer.wLock().lock();
									buffer.setNextReady(true);
									buffer.getThreadRunning().set(false);
									buffer.wLock().unlock();
								} else {
									buffer.getThreadRunning().set(true);
								}
							}
						}
					});
				}
				long value = segment.getValue().getAndIncrement();
				if (value < segment.getMax()) {
					return new Result(value, Status.SUCCESS);
				}
			} finally {
				buffer.rLock().unlock();
			}
			waitAndSleep(buffer);
			buffer.wLock().lock();
			try {
				final Segment segment = buffer.getCurrent();
				long value = segment.getValue().getAndIncrement();
				if (value < segment.getMax()) {
					return new Result(value, Status.SUCCESS);
				}
				if (buffer.isNextReady()) {
					buffer.switchPos();
					buffer.setNextReady(false);
				} else {
					logger.error("Both two segments in {} are not ready!", buffer);
					return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
				}
			} finally {
				buffer.wLock().unlock();
			}
		}
	}

	private void waitAndSleep(SegmentBuffer buffer) {
		int roll = 0;
		while (buffer.getThreadRunning().get()) {
			roll += 1;
			if (roll < 1000) {
				try {
					System.out.println("----------------------sleep 10 ms");
					TimeUnit.MILLISECONDS.sleep(10);
				} catch (InterruptedException e) {
					logger.warn("Thread {} Interrupted", Thread.currentThread().getName());
					break;
				}
			} else {
				logger.warn("Buffer key {} can not stop in given time ", buffer.getKey());
				break;
			}
		}
	}

	public List<LeafAlloc> getAllLeafAllocs() {
		return dao.getAllLeafAllocs();
	}

	public Map<String, SegmentBuffer> getCache() {
		return cache;
	}

	public IDAllocDao getDao() {
		return dao;
	}

	public void setDao(IDAllocDao dao) {
		this.dao = dao;
	}
}
